/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.handler.stream;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;

/**
 * A {@link ChannelHandler} that adds support for writing a large data stream
 * asynchronously neither spending a lot of memory nor getting
 * {@link OutOfMemoryError}. Large data streaming such as file transfer requires
 * complicated state management in a {@link ChannelHandler} implementation.
 * {@link ChunkedWriteHandler} manages such complicated states so that you can
 * send a large data stream without difficulties.
 * <p>
 * To use {@link ChunkedWriteHandler} in your application, you have to insert a
 * new {@link ChunkedWriteHandler} instance:
 * 
 * <pre>
 * {@link ChannelPipeline} p = ...;
 * p.addLast("streamer", <b>new {@link ChunkedWriteHandler}()</b>);
 * p.addLast("handler", new MyHandler());
 * </pre>
 * 
 * Once inserted, you can write a {@link ChunkedInput} so that the
 * {@link ChunkedWriteHandler} can pick it up and fetch the content of the
 * stream chunk by chunk and write the fetched chunk downstream:
 * 
 * <pre>
 * {@link Channel} ch = ...;
 * ch.write(new {@link ChunkedFile}(new File("video.mkv"));
 * </pre>
 *
 * <h3>Sending a stream which generates a chunk intermittently</h3>
 *
 * Some {@link ChunkedInput} generates a chunk on a certain event or timing.
 * Such {@link ChunkedInput} implementation often returns {@code null} on
 * {@link ChunkedInput#readChunk(ChannelHandlerContext)}, resulting in the
 * indefinitely suspended transfer. To resume the transfer when a new chunk is
 * available, you have to call {@link #resumeTransfer()}.
 */
public class ChunkedWriteHandler extends ChannelDuplexHandler
{

    private static final InternalLogger logger = InternalLoggerFactory
            .getInstance(ChunkedWriteHandler.class);

    private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();

    private volatile ChannelHandlerContext ctx;

    private PendingWrite currentWrite;

    public ChunkedWriteHandler()
    {
    }

    /**
     * @deprecated use {@link #ChunkedWriteHandler()}
     */
    @Deprecated
    public ChunkedWriteHandler(int maxPendingWrites)
    {
        if (maxPendingWrites <= 0)
        {
            throw new IllegalArgumentException("maxPendingWrites: "
                    + maxPendingWrites + " (expected: > 0)");
        }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception
    {
        this.ctx = ctx;
    }

    /**
     * Continues to fetch the chunks from the input.
     */
    public void resumeTransfer()
    {
        final ChannelHandlerContext ctx = this.ctx;
        if (ctx == null)
        {
            return;
        }
        if (ctx.executor().inEventLoop())
        {
            try
            {
                doFlush(ctx);
            }
            catch (Exception e)
            {
                if (logger.isWarnEnabled())
                {
                    logger.warn("Unexpected exception while sending chunks.",
                            e);
                }
            }
        }
        else
        {
            // let the transfer resume on the next event loop round
            ctx.executor().execute(new Runnable()
            {

                @Override
                public void run()
                {
                    try
                    {
                        doFlush(ctx);
                    }
                    catch (Exception e)
                    {
                        if (logger.isWarnEnabled())
                        {
                            logger.warn(
                                    "Unexpected exception while sending chunks.",
                                    e);
                        }
                    }
                }
            });
        }
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg,
            ChannelPromise promise) throws Exception
    {
        queue.add(new PendingWrite(msg, promise));
    }

    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception
    {
        if (!doFlush(ctx))
        {
            // Make sure to flush at least once.
            ctx.flush();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception
    {
        doFlush(ctx);
        ctx.fireChannelInactive();
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx)
            throws Exception
    {
        if (ctx.channel().isWritable())
        {
            // channel is writable again try to continue flushing
            doFlush(ctx);
        }
        ctx.fireChannelWritabilityChanged();
    }

    private void discard(Throwable cause)
    {
        for (;;)
        {
            PendingWrite currentWrite = this.currentWrite;

            if (this.currentWrite == null)
            {
                currentWrite = queue.poll();
            }
            else
            {
                this.currentWrite = null;
            }

            if (currentWrite == null)
            {
                break;
            }
            Object message = currentWrite.msg;
            if (message instanceof ChunkedInput)
            {
                ChunkedInput<?> in = (ChunkedInput<?>) message;
                try
                {
                    if (!in.isEndOfInput())
                    {
                        if (cause == null)
                        {
                            cause = new ClosedChannelException();
                        }
                        currentWrite.fail(cause);
                    }
                    else
                    {
                        currentWrite.success(in.length());
                    }
                    closeInput(in);
                }
                catch (Exception e)
                {
                    currentWrite.fail(e);
                    logger.warn(ChunkedInput.class.getSimpleName()
                            + ".isEndOfInput() failed", e);
                    closeInput(in);
                }
            }
            else
            {
                if (cause == null)
                {
                    cause = new ClosedChannelException();
                }
                currentWrite.fail(cause);
            }
        }
    }

    private boolean doFlush(final ChannelHandlerContext ctx) throws Exception
    {
        final Channel channel = ctx.channel();
        if (!channel.isActive())
        {
            discard(null);
            return false;
        }

        boolean flushed = false;
        ByteBufAllocator allocator = ctx.alloc();
        while (channel.isWritable())
        {
            if (currentWrite == null)
            {
                currentWrite = queue.poll();
            }

            if (currentWrite == null)
            {
                break;
            }
            final PendingWrite currentWrite = this.currentWrite;
            final Object pendingMessage = currentWrite.msg;

            if (pendingMessage instanceof ChunkedInput)
            {
                final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
                boolean endOfInput;
                boolean suspend;
                Object message = null;
                try
                {
                    message = chunks.readChunk(allocator);
                    endOfInput = chunks.isEndOfInput();

                    if (message == null)
                    {
                        // No need to suspend when reached at the end.
                        suspend = !endOfInput;
                    }
                    else
                    {
                        suspend = false;
                    }
                }
                catch (final Throwable t)
                {
                    this.currentWrite = null;

                    if (message != null)
                    {
                        ReferenceCountUtil.release(message);
                    }

                    currentWrite.fail(t);
                    closeInput(chunks);
                    break;
                }

                if (suspend)
                {
                    // ChunkedInput.nextChunk() returned null and it has
                    // not reached at the end of input. Let's wait until
                    // more chunks arrive. Nothing to write or notify.
                    break;
                }

                if (message == null)
                {
                    // If message is null write an empty ByteBuf.
                    // See https://github.com/netty/netty/issues/1671
                    message = Unpooled.EMPTY_BUFFER;
                }

                ChannelFuture f = ctx.write(message);
                if (endOfInput)
                {
                    this.currentWrite = null;

                    // Register a listener which will close the input once the
                    // write is complete.
                    // This is needed because the Chunk may have some resource
                    // bound that can not
                    // be closed before its not written.
                    //
                    // See https://github.com/netty/netty/issues/303
                    f.addListener(new ChannelFutureListener()
                    {
                        @Override
                        public void operationComplete(ChannelFuture future)
                                throws Exception
                        {
                            currentWrite.progress(chunks.progress(),
                                    chunks.length());
                            currentWrite.success(chunks.length());
                            closeInput(chunks);
                        }
                    });
                }
                else if (channel.isWritable())
                {
                    f.addListener(new ChannelFutureListener()
                    {
                        @Override
                        public void operationComplete(ChannelFuture future)
                                throws Exception
                        {
                            if (!future.isSuccess())
                            {
                                closeInput((ChunkedInput<?>) pendingMessage);
                                currentWrite.fail(future.cause());
                            }
                            else
                            {
                                currentWrite.progress(chunks.progress(),
                                        chunks.length());
                            }
                        }
                    });
                }
                else
                {
                    f.addListener(new ChannelFutureListener()
                    {
                        @Override
                        public void operationComplete(ChannelFuture future)
                                throws Exception
                        {
                            if (!future.isSuccess())
                            {
                                closeInput((ChunkedInput<?>) pendingMessage);
                                currentWrite.fail(future.cause());
                            }
                            else
                            {
                                currentWrite.progress(chunks.progress(),
                                        chunks.length());
                                if (channel.isWritable())
                                {
                                    resumeTransfer();
                                }
                            }
                        }
                    });
                }
            }
            else
            {
                ctx.write(pendingMessage, currentWrite.promise);
                this.currentWrite = null;
            }

            // Always need to flush
            ctx.flush();
            flushed = true;

            if (!channel.isActive())
            {
                discard(new ClosedChannelException());
                break;
            }
        }

        return flushed;
    }

    static void closeInput(ChunkedInput<?> chunks)
    {
        try
        {
            chunks.close();
        }
        catch (Throwable t)
        {
            if (logger.isWarnEnabled())
            {
                logger.warn("Failed to close a chunked input.", t);
            }
        }
    }

    private static final class PendingWrite
    {
        final Object msg;

        final ChannelPromise promise;

        PendingWrite(Object msg, ChannelPromise promise)
        {
            this.msg = msg;
            this.promise = promise;
        }

        void fail(Throwable cause)
        {
            ReferenceCountUtil.release(msg);
            promise.tryFailure(cause);
        }

        void success(long total)
        {
            if (promise.isDone())
            {
                // No need to notify the progress or fulfill the promise because
                // it's done already.
                return;
            }

            if (promise instanceof ChannelProgressivePromise)
            {
                // Now we know what the total is.
                ((ChannelProgressivePromise) promise).tryProgress(total, total);
            }

            promise.trySuccess();
        }

        void progress(long progress, long total)
        {
            if (promise instanceof ChannelProgressivePromise)
            {
                ((ChannelProgressivePromise) promise).tryProgress(progress,
                        total);
            }
        }
    }
}
